Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a Receiver.close() method #348

Merged
merged 4 commits into from
Jan 16, 2025
Merged

Conversation

shsms
Copy link
Contributor

@shsms shsms commented Nov 29, 2024

This method would allow individual receivers to be closed, without affecting the underlying channel, if there is one.

@shsms shsms requested a review from a team as a code owner November 29, 2024 14:57
@github-actions github-actions bot added part:channels Affects channels implementation part:synchronization Affects the synchronization of multiple sources (`select`, `merge`) part:core Affects the core types (`Sender`, `Receiver`, exceptions, etc.) part:utilities Affects the utility receivers (`Timer`, `Event`, `FileWatcher`) part:experimental Affects the experimental package labels Nov 29, 2024
@shsms
Copy link
Contributor Author

shsms commented Nov 29, 2024

Still need to add tests.

Copy link
Contributor

@llucax llucax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM but the CI is failing, and this reminds me that it would be good to add tests at least for the receivers that actually do something when close() is called.

And oh, yes! As usual names, names, names 😆

TL;DR: All good the name actually, just some other comments about future directions.

I was going to suggest to rename close() to stop(), for once to match the ReceiverStoppedError but also to match the upcoming Service interface.

But I see close actually wait for the current buffer to be consumed, my idea for stop() would be that stuff gets just dropped.

Maybe to make it play better with Service we can keep close() as you implemented it, as a "graceful" shutdown, and when we introduce Service we can use cancel() to drop the current buffers. Then maybe stop() can take an option to decide if close() or cancel() is called (stop is usually implemented as self.cancel(); await self).

src/frequenz/channels/_merge.py Show resolved Hide resolved
@shsms
Copy link
Contributor Author

shsms commented Dec 2, 2024

LGTM but the CI is failing, and this reminds me that it would be good to add tests at least for the receivers that actually do something when close() is called.

Yes, I'm already working on tests in this same branch, just made an early PR because the code is ready for review.

Maybe to make it play better with Service we can keep close() as you implemented it, as a "graceful" shutdown, and when we introduce Service we can use cancel() to drop the current buffers. Then maybe stop() can take an option to decide if close() or cancel() is called (stop is usually implemented as self.cancel(); await self).

I think stop() doesn't make sense for receivers, because it is not a task. Idiomatically, we should have stop() only for things that look like there must be an activity there. So it is great for Actor, etc. But for a stream, I like close() better.

@llucax
Copy link
Contributor

llucax commented Dec 3, 2024

I think stop() doesn't make sense for receivers, because it is not a task. Idiomatically, we should have stop() only for things that look like there must be an activity there. So it is great for Actor, etc. But for a stream, I like close() better.

Mmm, receiver don't need to have task, but many do (like Merger, Timer and FileWatcher). If we don't add a task-like interface, it means we can't stop these receivers using the Receiver interface, which is the only way to access them if we apply a filter or map to them for example.

But maybe there are other solutions to this. We can discuss this when the time comes.

@llucax
Copy link
Contributor

llucax commented Dec 3, 2024

Please let me know when this is ready for a final review :)

@shsms
Copy link
Contributor Author

shsms commented Dec 3, 2024

Mmm, receiver don't need to have task, but many do (like Merger, Timer and FileWatcher). If we don't add a task-like interface, it means we can't stop these receivers using the Receiver interface, which is the only way to access them if we apply a filter or map to them for example.

That doesn't sound like an issue, because people still call close() on the receiver, and if the underlying thing is a FileWatcher for example, the close() would just call the stop(), which is what this PR is doing already.

And calling close() on Map and Filter would call close() of the underlying receiver. This PR has that too.

@llucax
Copy link
Contributor

llucax commented Dec 3, 2024

That doesn't sound like an issue, because people still call close() on the receiver, and if the underlying thing is a FileWatcher for example, the close() would just call the stop(), which is what this PR is doing already.

But stop() is async, and it is important to be able to wait until the thing actually stopped to avoid those annoying warnings in test we were talking about 😆

If we make close() async, then fine, but then we are ACKing that receivers can have background running tasks, and then why shouldn't they follow the Service interface?

@llucax llucax added this to the v1.5.0 milestone Dec 10, 2024
@github-actions github-actions bot added part:docs Affects the documentation part:tests Affects the unit, integration and performance (benchmarks) tests labels Jan 7, 2025
@shsms
Copy link
Contributor Author

shsms commented Jan 7, 2025

This is ready, the only big change is that Receiver.close() is now async.

@shsms shsms requested a review from llucax January 7, 2025 10:39
@shsms shsms modified the milestones: v1.5.0, v1.6.0 Jan 8, 2025
@llucax
Copy link
Contributor

llucax commented Jan 9, 2025

Last push to have a (Background)Service-compatible interface.

If we make close() async, then fine, but then we are ACKing that receivers can have background running tasks, and then why shouldn't they follow the Service interface?

@llucax
Copy link
Contributor

llucax commented Jan 9, 2025

BTW, the method can't be abstract, otherwise this is a breaking change. We need to provide a default empty implementation I guess. We can create an issue to make the method abstract for 2.0.

Copy link
Contributor

@llucax llucax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't review everything, because I think the review will change a lot depending on what we do with close vs stop() (stop() being close()/cancel() + __await__).

cancel() and __await__ are actually the 2 primitives used by (Background)Service too, based on Task, and the more I look at it, the more I think receivers should follow this interface too.

src/frequenz/channels/file_watcher.py Show resolved Hide resolved
src/frequenz/channels/_anycast.py Show resolved Hide resolved
@shsms shsms force-pushed the close-receivers branch 3 times, most recently from 87e95f9 to 1cdba55 Compare January 13, 2025 10:54
@shsms
Copy link
Contributor Author

shsms commented Jan 13, 2025

I've made the close() method not abstract and not async. With these,

  • it is no longer a breaking change
  • it becomes a signal to close(), in keeping with the idioms of the asyncio library, which was the behaviour already even though it was async, and not a "ensure it is closed before returning".

With this the two above concerns should also go away.

@shsms shsms requested a review from llucax January 13, 2025 11:02
shsms added 4 commits January 13, 2025 14:15
Classes that implement the `Sender` or `Receiver` interfaces
currently need to override the `send` or the `ready` and `consume`
methods respectively.

As we add more methods, to these interfaces, it becomes hard to track
which methods are there for internal use and which ones are there to
implement the interface.

Using the `override` decorators helps with that.

Signed-off-by: Sahas Subramanian <[email protected]>
Also implement the method in all classes implementing the `Receiver`
interface.

Signed-off-by: Sahas Subramanian <[email protected]>
Signed-off-by: Sahas Subramanian <[email protected]>
@shsms
Copy link
Contributor Author

shsms commented Jan 13, 2025

Rebased on latest

@shsms
Copy link
Contributor Author

shsms commented Jan 15, 2025

@llucax could you take a look at this again?

Once the receiver's buffer is drained, trying to receive a message will raise a
[`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError].
"""
raise NotImplementedError("close() must be implemented by subclasses")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we don't forget:

@shsms shsms added this pull request to the merge queue Jan 16, 2025
Merged via the queue into frequenz-floss:v1.x.x with commit cd54c44 Jan 16, 2025
14 checks passed
@shsms shsms deleted the close-receivers branch January 16, 2025 14:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
part:channels Affects channels implementation part:core Affects the core types (`Sender`, `Receiver`, exceptions, etc.) part:docs Affects the documentation part:experimental Affects the experimental package part:synchronization Affects the synchronization of multiple sources (`select`, `merge`) part:tests Affects the unit, integration and performance (benchmarks) tests part:utilities Affects the utility receivers (`Timer`, `Event`, `FileWatcher`)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants